package com.bieber.computer.splitter;

import au.com.bytecode.opencsv.CSVReader;
import com.bieber.common.Constants;
import com.bieber.common.model.BizOrder;
import com.bieber.common.model.CacheValue;
import com.bieber.computer.ComputerConfig;
import org.apache.commons.lang.StringUtils;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.configuration.CacheConfiguration;

import java.io.*;
import java.util.concurrent.atomic.AtomicLong;

/**
 * Created by bieber on 2015/9/17.
 */
public class OrderSplitter extends DataSplitter<BizOrder> {

    private CSVReader reader;
    
    private AtomicLong cacheIndex = new AtomicLong(1);
    
    public OrderSplitter(ComputerConfig config, Ignite ignite) {
        super(config, ignite);
        if(StringUtils.isEmpty(config.getOrderCSVFile())){
            throw new IllegalArgumentException("please configure orderCSVFile property");
        }
        try {
            reader = new CSVReader(new InputStreamReader(new FileInputStream(new File(config.getOrderCSVFile()))));
        } catch (FileNotFoundException e) {
            throw new IllegalArgumentException("order csv file "+config.getOrderCSVFile()+" not exist",e);
        }
    }

    //csv id,personId,amount
    @Override
    protected void split() {
        initCache(Constants.ORDER_CACHE+cacheIndex.getAndIncrement(),BizOrder.class);
        for(int i=0;i<config.getPreNodeSize();i++){
            try {
                String[] data = reader.readNext();
                if(data==null){
                    isFinished=true;
                    break;
                }
                if(data.length!=3){
                    throw new IllegalArgumentException("order csv file column length must be 3");
                }
                BizOrder order = new BizOrder();
                order.setId(Integer.parseInt(data[0]));
                order.setPersonId(Integer.parseInt(data[1]));
                order.setAmount(Double.parseDouble(data[2]));
                dataCache.putIfAbsent(order.getId(),order);
            } catch (IOException e) {
                throw new IllegalStateException("failed to read csv file ",e);
            }
        }
    }
}
